﻿#include "box_network.hh"
#include "box_channel.hh"
#include "fixed_mem_pool.hh"

#include "../../thirdparty/klogger/klogger/interface/logger.h"
#include "../../thirdparty/knet/include/knet.h"

#include "../config/box_config.hh"

#include "../detail/box_alloc.hh"
#include "../detail/lang_impl.hh"

#include "../repo/src/include/root/rpc_root.h"
#include "../repo/src/include/root/rpc_stub.h"

#include "../util/object_pool.hh"
#include "../util/os_util.hh"
#include "../util/string_util.hh"
#include "../util/box_debug.hh"

#include <cstring>
#include <unordered_map>

// 每个工作线程一个独立的BoxNetwork指针
thread_local kratos::service::BoxNetwork *network_ = nullptr;

namespace kratos {
namespace service {

static std::shared_ptr<BoxChannel> NullChannel;

/**
 * 网络底层内存分配.
 * 
 * \param size 字节数
 * \return 内存地址
 */
static void *knet_malloc_func(size_t size) {
  if (!size) {
    return nullptr;
  }
  return box_malloc(size);
}
/**
 * 网络底层内存释放.
 * 
 * \param p 内存地址
 */
static void knet_free_func(void *p) {
  if (p) {
    box_free(p);
  }
}

BoxNetwork::BoxNetwork()
  : main_queue(&net_to_main_queue, &main_to_net_queue),
    net_queue(&main_to_net_queue, &net_to_main_queue) {
  // 替换网络库的内存分配器
  knet_set_malloc_func(knet_malloc_func);
  knet_set_free_func(knet_free_func);
}

BoxNetwork::~BoxNetwork() {
  stop();
}

auto BoxNetwork::start() -> bool {
  loop_ = knet_loop_create();
  if (!loop_) {
    return false;
  }
  on_config_change();
  // 启动网络线程
  start_worker();
  return true;
}

auto BoxNetwork::stop() -> bool {
  running_ = false;
  // 等待网络线程退出
  if (!thread_exit_ && thread_.joinable()) {
    thread_.join();
  }
  // 销毁网络循环
  if (loop_) {
    knet_loop_destroy(loop_);
    loop_ = nullptr;
  }
  // 清理未完成的网络事件
  NetEventData event_data;
  while (true) {
    auto result = main_queue.try_read(event_data);
    if (!result) {
      break;
    }
    event_data.clear();
  }
  listener_name_map_.clear();
  return true;
}

auto BoxNetwork::start_worker() -> void {
  box_channel_recv_buffer_len_ = get_config().get_box_channel_recv_buffer_len();
  running_ = true;
  thread_ = std::thread([&]() {
    // 设置当前线程的网络类指针
    network_ = this;
    thread_exit_ = false;
    while (running_) {
      // 1. 运行网络主循环
      // 2. 处理队列的网络事件
      knet_loop_run_once(loop_);
      network_thread_processor();
    }
    // 清理网络线程
    worker_cleanup();
  });
}

auto BoxNetwork::worker_cleanup() -> void {
  // 减少所有现存管道引用计数
  for (const auto& it : thread_channel_map_) {
    knet_channel_ref_leave(it.second);
  }
  thread_channel_map_.clear();
  // 清理所有未处理的网络事件
  NetEventData event_data;
  while (true) {
    auto result = net_queue.try_read(event_data);
    if (!result) {
      break;
    }
    event_data.clear();
  }
  // 线程退出标志
  thread_exit_ = true;
  network_     = nullptr;
}

// 网络线程内处理管道读事件
void do_channel_recv(kchannel_ref_t *channel) {
  box_assert(network_, !channel);

  NetEventData event_data;

  auto uuid    = knet_channel_ref_get_uuid(channel);
  auto *stream = knet_channel_ref_get_stream(channel);

  box_assert(network_, !stream);

  int size = knet_stream_available(stream);
  if (!size) {
    // 管道内没有数据却触发了读事件，关闭这个管道
    knet_channel_ref_close(channel);
    return;
  }
  event_data.event_id = NetEvent::recv_data_notify;
  event_data.recv_data_notify.channel_id = uuid;
  event_data.recv_data_notify.data_ptr = box_malloc(size);

  box_assert(network_, !event_data.recv_data_notify.data_ptr);

  event_data.recv_data_notify.length = size;
  if (error_ok !=
      knet_stream_pop(stream, event_data.recv_data_notify.data_ptr, size)) {
    // 从管道内取数据失败，清理资源
    event_data.clear();
    knet_channel_ref_close(channel);
    return;
  }
  // 发送到逻辑线程
  if (!network_->get_net_queue().send(event_data)) {
    event_data.clear();
    // 发送失败，清理资源
    knet_channel_ref_close(channel);
  }
}

// 网络线程内处理管道关闭事件
void do_channel_close(kchannel_ref_t *channel) {
  box_assert(network_, !channel);

  auto uuid = knet_channel_ref_get_uuid(channel);
  auto it = network_->get_worker_channel_map().find(uuid);
  if (it != network_->get_worker_channel_map().end()) {
    box_assert(network_, !it->second);

    // 在管道查询表内找到，释放资源
    knet_channel_ref_leave(it->second);
    network_->get_worker_channel_map().erase(it);
  }
  // 发送到逻辑线程
  NetEventData event_data;
  event_data.event_id = NetEvent::close_notify;
  event_data.close_notify.channel_id = uuid;
  if (!network_->get_net_queue().send(event_data)) {
    // 发送失败，清理资源
    event_data.clear();
  }
}

void do_connector_channel_close(kchannel_ref_t *channel) {
  box_assert(network_, !channel);

  auto *name_ptr = reinterpret_cast<char *>(knet_channel_ref_get_ptr(channel));
  if (name_ptr) {
    box_free(name_ptr);
    knet_channel_ref_set_ptr(channel, nullptr);
  }
  do_channel_close(channel);
}

// 网络线程内，由监听器创建的管道，事件处理函数
void client_cb(kchannel_ref_t *channel, knet_channel_cb_event_e e) {
  if (e & channel_cb_event_recv) {
    do_channel_recv(channel);
  } else if (e & channel_cb_event_close) {
    do_channel_close(channel);
  }
}

// 网络线程内，监听器接受了一个连接请求
void do_accept_channel(kchannel_ref_t *channel) {
  box_assert(network_, !channel);

  auto uuid           = knet_channel_ref_get_uuid(channel);
  auto channel_shared = knet_channel_ref_share(channel);

  box_assert(network_, !channel_shared);

  network_->get_worker_channel_map()[uuid] = channel_shared;
  knet_channel_ref_set_cb(channel, client_cb);

  // 发送到逻辑线程
  NetEventData event_data;
  event_data.event_id = NetEvent::accept_notify;
  event_data.accept_notify.channel_id = uuid;
  auto *uuid_ptr = knet_channel_ref_get_ptr(channel);

  box_assert(network_, !uuid_ptr);

  if (uuid_ptr) {
    auto listen_channel_uuid = *reinterpret_cast<std::uint64_t *>(uuid_ptr);
    auto& listener_map = network_->get_listener_name_map();
    auto it = listener_map.find(listen_channel_uuid);
    if (it != listener_map.end()) {
      const auto &listener_name = it->second.second;
      event_data.accept_notify.name = box_malloc(listener_name.size() + 1);
      box_assert(network_, !event_data.accept_notify.name);

      memcpy(
        event_data.accept_notify.name,
        listener_name.c_str(),
        listener_name.size() + 1
      );
    } 
  }
  if (!uuid_ptr || !network_->get_net_queue().send(event_data)) {
    // 内部错误或发送失败, 销毁资源
    knet_channel_ref_leave(channel_shared);
    network_->get_worker_channel_map().erase(uuid);
    event_data.clear();
  }
}

// 网络线程内，监听器事件处理函数
void acceptor_cb(kchannel_ref_t *channel, knet_channel_cb_event_e e) {
  if (e & channel_cb_event_accept) {
    do_accept_channel(channel);
  }
}

// 网络线程内，处理连接事件（成功或失败)
void do_connect_channel(kchannel_ref_t *channel, bool success) {
  box_assert(network_, !channel);

  auto uuid = knet_channel_ref_get_uuid(channel);
  kchannel_ref_t *channel_shared = nullptr;
  bool update_success = true;
  if (success) {
    channel_shared = knet_channel_ref_share(channel);
    box_assert(network_, !channel_shared);
    update_success = network_->add_worker_channel(uuid, channel_shared);
    box_assert(network_, !update_success);
  }
  NetEventData event_data;
  auto *name_ptr = reinterpret_cast<char *>(knet_channel_ref_get_ptr(channel));

  box_assert(network_, !name_ptr);

  auto length = std::strlen(name_ptr) + 1;
  event_data.event_id = NetEvent::connect_response;
  event_data.connect_response.name = box_malloc(length);
  memcpy(event_data.connect_response.name, name_ptr, length);
  event_data.connect_response.channel_id = uuid;
  event_data.connect_response.success = (success && update_success);
  if (!network_->get_net_queue().send(event_data)) {
    // 发送失败清理资源
    event_data.clear();
    if (channel_shared) {
      knet_channel_ref_leave(channel_shared);
    }
    network_->get_worker_channel_map().erase(uuid);
  }
}

// 网络线程内，连接器事件处理
void connector_cb(kchannel_ref_t *channel, knet_channel_cb_event_e e) {
  if (e & channel_cb_event_connect) {
    do_connect_channel(channel, true);
  } else if (e & channel_cb_event_recv) {
    do_channel_recv(channel);
  } else if (e & channel_cb_event_connect_timeout) {
    do_connect_channel(channel, false);
  } else if (e & channel_cb_event_close) {
    do_connector_channel_close(channel);
  }
}

// 网络线程内，处理建立监听器事件
void do_listen_request(const NetEventData &request, kloop_t *loop) {
  auto* channel = knet_loop_create_channel(
    loop,
    0, // 已废弃
    network_->get_channel_recv_buffer_len()
  );
  box_assert(network_, !channel);

  auto uuid = knet_channel_ref_get_uuid(channel);
  box_assert(network_, !uuid);

  auto retval = knet_channel_ref_accept(
    channel,
    request.listen_request.host,
    request.listen_request.port,
    512 // TODO configurable
  );

  box_assert(network_, !request.listen_request.name);
  std::string listener_name(request.listen_request.name);

  NetEventData response;
  auto length = static_cast<int>(std::strlen(request.listen_request.name) + 1);
  response.listen_response.name = box_malloc(length);
  box_assert(network_, !response.listen_response.name);

  memcpy(response.listen_response.name, request.listen_request.name, length);
  response.event_id = NetEvent::listen_response;
  if (error_ok != retval) {
    response.listen_response.channel_id = 0;
    response.listen_response.success = false;
    knet_channel_ref_close(channel);
  } else {
    auto* shared_channel = knet_channel_ref_share(channel);
    box_assert(network_, !shared_channel);

    network_->get_worker_channel_map()[uuid] = shared_channel;
    response.listen_response.channel_id = uuid;
    response.listen_response.success = true;
    knet_channel_ref_set_cb(channel, acceptor_cb);
  }
  if (!network_->get_net_queue().send(response)) {
    // 发送失败，清理资源
    // 发生这种情况，会导致逻辑线程无法得知监听器是否启动成功
    response.clear();
    knet_channel_ref_close(channel);
  } else {
    if (error_ok == retval) {
      network_->get_listener_name_map()[uuid] = {uuid, listener_name};
      auto *uuid_ptr = &(network_->get_listener_name_map()[uuid].first);
      knet_channel_ref_set_ptr(channel, reinterpret_cast<void *>(uuid_ptr));
    }
  }
}

// 网络线程内，处理建立连接器事件
void do_connect_request(const NetEventData &request, kloop_t *loop) {
  box_assert(network_, !request.connect_request.host);
  box_assert(network_, !request.connect_request.name);

  auto channel = knet_loop_create_channel(
    loop,
    0, // 已废弃
    network_->get_channel_recv_buffer_len()
  );
  box_assert(network_, !channel);

  auto retval = knet_channel_ref_connect(
    channel,
    request.connect_request.host,
    request.connect_request.port,
    request.connect_request.timeout
  );
  if (error_ok != retval) {
    NetEventData response;
    response.event_id = NetEvent::connect_response;
    auto length = std::strlen(request.connect_request.name) + 1;
    response.connect_response.name = box_malloc(length);
    box_assert(network_, !request.connect_response.name);

    memset(response.connect_response.name, 0, length);
    memcpy(
      response.connect_response.name,
      request.connect_request.name,
      length
    );
    response.connect_response.channel_id = 0;
    response.connect_response.success    = false;
    if (!network_->get_net_queue().send(response)) {
      // 发送失败，清理资源
      response.clear();
      knet_channel_ref_close(channel);
    }
  } else {
    knet_channel_ref_set_cb(channel, connector_cb);
    auto length = std::strlen(request.connect_request.name) + 1;
    auto *name = box_malloc(length);
    box_assert(network_, !name);

    if (name) {
      memset(name, 0, length);
      memcpy(name, request.connect_request.name, length);
      knet_channel_ref_set_ptr(channel, name);
    } else {
      knet_channel_ref_close(channel);
    }
  }
}

// 网络线程内，处理发送数据事件
void do_send_request(const NetEventData &request, kloop_t *loop) {
  box_assert(network_, !request.send_data_notify.data_ptr);

  auto& worker_channel_map = network_->get_worker_channel_map();
  auto it = worker_channel_map.find(request.send_data_notify.channel_id);
  if (it == worker_channel_map.end()) {
    return;
  }
  auto *channel_ref = it->second;
  auto *stream = knet_channel_ref_get_stream(channel_ref);
  box_assert(network_, !stream);

  auto retval  = knet_stream_push(
    stream,
    request.send_data_notify.data_ptr,
    request.send_data_notify.length
  );

  if (error_ok != retval) {
    knet_channel_ref_close(channel_ref);
  }
}

// 网络线程内，处理管道关闭事件
void do_close_request(const NetEventData &request, kloop_t *loop) {
  auto uuid = request.close_request.channel_id;
  auto it = network_->get_worker_channel_map().find(uuid);
  if (it != network_->get_worker_channel_map().end()) {
    knet_channel_ref_close(it->second);
  }
}

auto BoxNetwork::do_event_worker(NetEventData &event_data) -> void {
  switch (event_data.event_id) {
  case NetEvent::listen_request: {
    do_listen_request(event_data, loop_);
  } break;
  case NetEvent::connect_request: {
    do_connect_request(event_data, loop_);
  } break;
  case NetEvent::send_data_notify: {
    do_send_request(event_data, loop_);
  } break;
  case NetEvent::close_request: {
    do_close_request(event_data, loop_);
  } break;
  default:
    break;
  }
}

auto BoxNetwork::on_config_change() -> void {
  get_config().add_reload_listener(
    "network", [&](const std::string&, const config::BoxConfig &new_config) {
      const auto &old_config = get_config();
      if (old_config.get_box_channel_recv_buffer_len() !=
          new_config.get_box_channel_recv_buffer_len()) {
        box_channel_recv_buffer_len_ =
          new_config.get_box_channel_recv_buffer_len();
        std::string reason1 =
          "box_channel_recv_buffer_len = " +
          std::to_string(old_config.get_box_channel_recv_buffer_len());
        std::string reason2 =
          "box_channel_recv_buffer_len = " +
          std::to_string(new_config.get_box_channel_recv_buffer_len());
        write_log(
          lang::LangID::LANG_RELOAD_INFO,
          klogger::Logger::WARNING,
          reason1.c_str(),
          reason2.c_str()
        );
      }
    });
}

void BoxNetwork::network_thread_processor() {
  while (true) {
    NetEventData event_data;
    auto result = net_queue.try_read(event_data);
    if (!result) {
      break;
    }
    try {
      do_event_worker(event_data);
    } catch (std::exception &e) {
      write_log(
        lang::LangID::LANG_UNEXPECTED_EXCEPTION,
        klogger::Logger::FAILURE,
        "BoxNetwork",
        util::demangle(typeid(e).name()).c_str(),
        e.what()
      );
    }
    event_data.clear();
  }
}

auto BoxNetwork::listen_at(
  const std::string name,
  const std::string &host,
  int port) -> bool {

  auto ip = util::get_host_ip(host);
  if (!util::is_ip_address(ip) || (port == 0) || (name.empty())) {
    return false;
  }
  NetEventData event_data;
  std::string real_name = name;
  if (real_name.empty()) {
    real_name = "unknown";
  }
  event_data.event_id = NetEvent::listen_request;
  event_data.listen_request.name = box_malloc(real_name.size() + 1);
  box_assert(this, !event_data.listen_request.name);

  event_data.listen_request.host = box_malloc(host.size() + 1);
  box_assert(this, !event_data.listen_request.host);

  event_data.listen_request.port = port;
  memset(event_data.listen_request.name, 0, real_name.size() + 1);
  memcpy(event_data.listen_request.name, real_name.c_str(), real_name.size());
  memset(event_data.listen_request.host, 0, host.size() + 1);
  memcpy(event_data.listen_request.host, host.c_str(), host.size());
  // 发送到网络线程
  if (!main_queue.send(event_data)) {
    event_data.clear();
    return false;
  }
  return true;
}

auto BoxNetwork::connect_to(
  const std::string name,
  const std::string &host,
  int port,
  int timeout) -> bool {

  auto ip = util::get_host_ip(host);
  if (!util::is_ip_address(ip) || (port == 0) || (name.empty())) {
    return false;
  }
  NetEventData event_data;
  event_data.event_id = NetEvent::connect_request;
  event_data.connect_request.host = box_malloc(host.size() + 1);
  box_assert(this, !event_data.connect_request.host);

  event_data.connect_request.name = box_malloc(name.size() + 1);
  box_assert(this, !event_data.connect_request.name);

  event_data.connect_request.port = port;
  event_data.connect_request.timeout = timeout;
  memset(event_data.connect_request.host, 0, host.size() + 1);
  memcpy(event_data.connect_request.host, host.c_str(), host.size());
  memset(event_data.connect_request.name, 0, name.size() + 1);
  memcpy(event_data.connect_request.name, name.c_str(), name.size());
  // 发送到网络线程
  if (!main_queue.send(event_data)) {
    event_data.clear();
    return false;
  }
  return true;
}

auto BoxNetwork::close_channel(std::uint64_t id) -> bool {
  return enqueue_close_request(id);
}

auto BoxNetwork::do_event_main(NetEventData &event_data) -> void {
  switch (event_data.event_id) {
  case NetEvent::listen_response: {
    auto id = event_data.listen_response.channel_id;
    if (event_data.listen_response.success) {
      auto ptr = make_shared_pool_ptr<BoxChannel>(
        id, this, event_data.listen_response.name);

      box_assert(this, !ptr);

      main_channel_map_[id] = ptr;
      on_listen(event_data.listen_response.name, true, ptr);
    } else {
      on_listen(event_data.listen_response.name, false, NullChannel);
    }
  } break;
  case NetEvent::accept_notify: {
    auto id  = event_data.accept_notify.channel_id;
    auto ptr = make_shared_pool_ptr<BoxChannel>(
      id, this, event_data.accept_notify.name);

    box_assert(this, !id);
    box_assert(this, !ptr);

    main_channel_map_[id] = ptr;
    on_accept(ptr);
  } break;
  case NetEvent::close_notify: {
    auto id = event_data.close_notify.channel_id;
    box_assert(this, !id);

    auto it = main_channel_map_.find(id);
    if (it == main_channel_map_.end()) {
      break;
    }
    auto ptr = it->second;

    box_assert(this, !ptr);
    // 关闭事件处理
    on_close(ptr);
    // 设置关闭标记
    ptr->set_close_flag();
    // 更新主线程管道表
    main_channel_map_.erase(it);
  } break;
  case NetEvent::connect_response: {
    if (event_data.connect_response.success) {
      auto id = event_data.connect_response.channel_id;
      box_assert(this, !id);

      auto ptr = make_shared_pool_ptr<BoxChannel>(
        id, this, event_data.connect_response.name);

      box_assert(this, !ptr);

      // 更新主线程管道表
      main_channel_map_[id] = ptr;
      // 成功
      on_connect(event_data.connect_response.name, true, ptr);
    } else {
      // 失败
      on_connect(event_data.connect_response.name, false, NullChannel);
    }
  } break;
  case NetEvent::recv_data_notify: {
    auto id = event_data.recv_data_notify.channel_id;
    box_assert(this, !id);

    auto it = main_channel_map_.find(id);
    if (it == main_channel_map_.end()) {
      break;
    }
    auto ptr = it->second;

    box_assert(this, !ptr);

    auto bytes = ptr->write_buffer(
      event_data.recv_data_notify.data_ptr,
      event_data.recv_data_notify.length
    );
    if (bytes != event_data.recv_data_notify.length) {
      ptr->close();
    } else {
      on_data(ptr);
    }
  } break;
  default:
    break;
  }
}

auto BoxNetwork::update() -> void {
  NetEventData event_data;
  while (main_queue.try_read(event_data)) {
    try {
      do_event_main(event_data);
    } catch (std::exception &e) {
      write_log(
        lang::LangID::LANG_UNEXPECTED_EXCEPTION,
        klogger::Logger::FAILURE,
        "BoxNetwork",
        util::demangle(typeid(e).name()).c_str(),
        e.what()
      );
    }
    event_data.clear();
  }
}

auto BoxNetwork::get_channel(std::uint64_t id)
  const noexcept-> const std::shared_ptr<BoxChannel> & {
  auto it = main_channel_map_.find(id);
  if (it == main_channel_map_.end()) {
    return NullChannel;
  }
  return it->second;
}

BoxNetwork::SPSCQueuePair &BoxNetwork::get_net_queue() noexcept  {
  return net_queue;
}

BoxNetwork::SPSCQueuePair &BoxNetwork::get_main_queue() noexcept {
  return main_queue;
}

BoxNetwork::ChannelMap &BoxNetwork::get_worker_channel_map() noexcept {
  return thread_channel_map_;
}

auto BoxNetwork::add_worker_channel(std::uint64_t channel_id,
  kchannel_ref_t* channel_ref) -> bool {
  auto it = thread_channel_map_.emplace(channel_id, channel_ref);
  if (!it.second) {
    return false;
  }
  return true;
}

auto BoxNetwork::get_listener_name_map() noexcept -> ListenerNameMap & {
  return listener_name_map_;
}

int BoxNetwork::get_channel_recv_buffer_len() const noexcept {
  return box_channel_recv_buffer_len_;
}

int BoxNetwork::enqueue_send_request(
  std::uint64_t id,
  const char *data,
  int size) {

  box_assert(this, !id);
  box_assert(this, !data);
  box_assert(this, !size);

  NetEventData event;
  event.event_id = NetEvent::send_data_notify;
  event.send_data_notify.channel_id = id;
  event.send_data_notify.data_ptr = box_malloc(size);
  box_assert(this, !event.send_data_notify.data_ptr);

  event.send_data_notify.length = size;
  memcpy(event.send_data_notify.data_ptr, data, size);

  // 发送到网络线程
  if (!main_queue.send(event)) {
    event.clear();
    return 0;
  }
  return size;
}

bool BoxNetwork::enqueue_close_request(std::uint64_t id) {
  box_assert(this, !id);

  NetEventData event;
  event.event_id = NetEvent::close_request;
  event.close_request.channel_id = id;
  return main_queue.send(event);
}

} // namespace service
} // namespace kratos
